/**
 * lsv_wbuffer.h   provide write buffer function
 * funtion:    1: cache write page in mem
 * 	       2: persisten the page in disk
 * author:     weizheng
 * date:       20170304
 * */
#ifndef __LSV_WBUFFER__
#define __LSV_WBUFFER__

/**
 * @file Write Buffer.
 * @author gj
 *
 * 承接用户IO流，满足顺序一致性要求。
 *
 * 基本流程：
 *
 * IO写入WAL即可返回，同时更新一内存缓冲区，聚合成1M大小的chunk。
 * 限制内存缓冲区的内存使用量，最小可以采用双缓冲区，2M。
 *
 * 注意事项：
 * - WAL的LSN要与内存缓冲区的顺序保持一直，否则，影响创建快照，REDO等过程
 * - 快照点，不能放在一个IO的中间。否则，难以定义rollback的语义。
 * - 提交点，决定内存缓冲区中chunk的回收
 *
 * 主要模块：
 * - WAL2
 * - Wbuf + Index
 *
 * 其他模块：
 * - pipeline，log+bitmap流水线，提高处理的并发度
 * - qos：内存缓冲区的qos策略
 *
 * @todo 正确理解什么是顺序一致性。
 * @todo 主要的困难，是协程执行的不连续性。为了保序，需要对代码进行特别组织。 网络，IO，lock等会引起执行的不连续性。
 * @todo 预分配 (WAL + wbuf pages)
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>

#include "list.h"
#include "schedule.h"
#include "coroutine.h"
#include "token_bucket.h"

#include "lsv_lib.h"
#include "lsv_types.h"
#include "lsv_conf.h"
#include "lsv_volume.h"
#include "lsv_help.h"
#include "lsv_log.h"

#define HASH_BIT_LEN    (20)
#define WBUF_INDEX_SIZE (1 << HASH_BIT_LEN)
#define HASH_KEY(off)   (((off) >> 12) & ((1 << HASH_BIT_LEN) - 1))

/*organize the index of wbuffer*/
typedef struct __index_block {
        struct list_head hook;

        lsv_u64_t offset;    //lba

        void *segment;
        lsv_u32_t chunk_id;
        lsv_u32_t chunk_off;
} index_block;

typedef struct __index_block_head {
        struct list_head list;
} index_block_head;

// -- chunk

typedef union __lsv_log_meta {
        lsv_log_head_t head;
        lsv_log_hlog_t hlog;
} lsv_log_meta_t;

typedef struct __lsv_chunk_buf {
        // {{ 1M
        lsv_log_meta_t hlog_arg[LSV_WBUF_PAGE_NUM];                        // page0 (4K = 256 * 16)
        lsv_s8_t       page_buf[LSV_WBUF_CHUNK_SIZE - LSV_WBUF_PAGE_SIZE]; // page[1-256]
        // }}

        // [0-255]
        int page_idx;              // must be here, struct offset is 1M
        int in_use;
        int fill_count;
        uint64_t chunk_lsn;
        uint64_t chunk_psn;
        uint32_t id;
        // int idx;
} lsv_chunk_buf_t;

// -- wbuf segment，管理一个或多个chunk

typedef struct __lsv_wbuf_segment_t {
        struct list_head work_hook;
        struct list_head ready_hook;

        // buffer
        lsv_chunk_buf_t *lsv_chunk;
        lsv_u32_t chunk_idx;

        index_block hash_block_array[LSV_WBUF_CHUNK_NUM * LSV_WBUF_PAGE_NUM];
} lsv_wbuf_segment_t;

// -- 类似lsv_wbuf_segment_t，管理一个chunk

struct __lsv_wbuf_chunk_t;

typedef struct __chunk_page_t {
        struct list_head hook;

        lsv_u64_t lba;
        struct __lsv_wbuf_chunk_t *chunk;
        uint32_t page_idx;
} chunk_page_t;

typedef struct __lsv_wbuf_chunk_t {
        // for free/wait list
        struct list_head hook;

        uint32_t id;
        lsv_chunk_buf_t *chunk_buf;

        // for gc
        chunk_page_t pages[LSV_WBUF_PAGE_NUM];
} lsv_wbuf_chunk_t;

// -- WAL

typedef struct {
        struct list_head hook;
        uint32_t idx;          // index in chunks@lsv_wbuf_wal_segment_t
        uint32_t chunk_id;
        uint32_t next_page;    // next free page, 0: empty, 256: full
        uint64_t max_lsn;      // in ARIES
} lsv_wbuf_wal_chunk_t;

typedef struct {
        struct list_head hook;

        lsv_wbuf_wal_chunk_t *chunks[WAL_SEGMENT_CHUNK_NUM];
        int chunk_num;    // === IO_DEPTH
        int curr_chunk;   // index in array

        uint64_t max_lsn; // io max LSN
} lsv_wbuf_wal_segment_t;

typedef struct {
        count_list_t free_list;
        count_list_t wait_list;

        // for sync
        lsv_rwlock_t malloc_rwlock;
        co_cond_t full_cond;
} lsv_wbuf_wal_t;

// -- pipeline

typedef struct {
        // count_list_t list;
        count_list_t log_queue;
        count_list_t bitmap_queue;

        lsv_volume_proto_t *lsv_info;
        int status;
        int stage_bitmap_running;
        int is_too_many_log;
        co_cond_t too_many_log;

        uint64_t total_count;
        uint64_t log_count;
        uint64_t bitmap_count;

        uint64_t log_enter_count;

        uint32_t last_log_chunk_id;
        uint32_t last_bitmap_chunk_id;
} pipeline_t;

typedef enum {
        PIPELINE_CHUNK_INIT = 0,

        PIPELINE_CHUNK_PRE_LOG,
        PIPELINE_CHUNK_POST_LOG,

        PIPELINE_CHUNK_PRE_BITMAP,
        PIPELINE_CHUNK_POST_BITMAP,

        PIPELINE_CHUNK_MAX,
} pipeline_chunk_status_t;

typedef struct {
        struct list_head hook;
        struct list_head local_hook;
        pipeline_t *pipeline;
        lsv_chunk_buf_t *chunk;
        lsv_log_proto_t log_proto;
        uint32_t chunk_id;
        int status;
        int ret;
} pipeline_chunk_t;

typedef struct {
        struct timeval t1, t2;
        uint64_t in_pc, in_pc2;
        uint64_t out_pc, out_pc2;
        double in_flow, out_flow;
        int64_t interval;

        int64_t total_pages;
} lsv_wbuf_qos_t;

// for io_stream
typedef struct __lsv_io_entry_t {
        struct list_head hook;
        const io_t *io;
        const buffer_t *buffer;
} lsv_io_entry_t;

typedef struct __lsv_wbuf_t {
        // for IO
        co_mq_t io_mq;

        // WAL
        lsv_wbuf_wal_t *wal;

#if WBUF_USE_CHUNK != 1
        // wbuf: memory
        count_list_t work_list;
        count_list_t ready_list;

        // 跟踪访问历史
        lsv_wbuf_segment_t *last_segment;
        int last_chunk_idx;
        int last_page_idx;
#else
        count_list_t free_list;
        count_list_t wait_list;

        lsv_wbuf_chunk_t *last_chunk;
        int last_page_idx;
#endif

        // global index: (hash(lba), index_block_head/list<index_block>)
        index_block_head *wb_hash_index;
        lsv_rwlock_t wb_hash_rwlock;

        pipeline_t pipeline;
        co_mq_t wbuf2log_stream;

        lsv_wbuf_qos_t qos;

        // sync method
        uint64_t last_lsn;   // logical serial number (IO)
        uint64_t last_psn;   // page serial number

        uint64_t commit_lsn;
        uint64_t commit_psn;

        uint64_t __last_lsn;
        uint64_t __last_psn;

        co_cond_t io_cond;
        co_cond_t full_cond;
        co_cond_t flush_cond;

} lsv_wbuf_t;

typedef struct {
        lsv_volume_proto_t *lsv_info;
        lsv_chunk_buf_t *chunk_buf;
} lsv_wbuf2log_ctx_t;

typedef enum {
        WBUF_IO_FRAG_PREPARE = 0,
        WBUF_IO_FRAG_WAIT    = 1,
        WBUF_IO_FRAG_FILL    = 2,
} wbuf_io_frag_status;


/**
 * 分页, 关联到wbuf的chunk上
 */
typedef struct {
        struct list_head hook;

        // 与IO相关的属性
        lsv_volume_proto_t *lsv_info;
        const io_t *io;
        buffer_t *buf;

        // 分页后的，与页相关的属性
        // {{
        uint64_t lba;
        uint64_t off;
        uint32_t size;

        lsv_wbuf_chunk_t *chunk;
        // uint32_t chunk_idx;
        // uint32_t chunk_id;
        uint32_t page_idx;
        uint32_t page_num;     // === 1
        uint32_t snap_id;
        // }}

        int is_fill;
        int wakeup;
        task_t task;

        co_cond_t cond;
        int status;
        int __enter_count_fill;
        int __enter_count_write;
} wbuf_io_frag_t;

typedef struct {
        lsv_volume_proto_t *lsv_info;
        // io
        const io_t *io;
        buffer_t *buf;

        // raw io properties
        // uint64_t lsn;
        // uint64_t off;
        // uint32_t size;

        uint32_t frag_num;
        wbuf_io_frag_t frags[0];
} wbuf_io_head_t;

static inline int chunk_is_empty(const lsv_chunk_buf_t *chunk_buf) {
        if (chunk_buf->in_use == LSV_WBUF_CHUNK_CLEAN) {
                return (chunk_buf->page_idx == 0) ? 1 : 0;
        } else if (chunk_buf->in_use == LSV_WBUF_CHUNK_POST_COMMIT) {
                return 1;
        } else {
                YASSERT(0);
                return 0;
        }
}

/**
 * @note 在预分配模式下，三态皆有可能
 *
 * @param chunk_buf
 * @return
 */
static inline int chunk_is_full(const lsv_chunk_buf_t *chunk_buf) {
        // YASSERT(chunk_buf->in_use == LSV_WBUF_CHUNK_CLEAN || chunk_buf->in_use == LSV_WBUF_CHUNK_POST_COMMIT);
        return (chunk_buf->page_idx == LSV_WBUF_PAGE_NUM - 1);
}

static inline int chunk_is_fill(const lsv_chunk_buf_t *chunk_buf) {
        DINFO_NOP("fill_count %u page_idx %u\n", chunk_buf->fill_count, chunk_buf->page_idx);
        YASSERT(chunk_buf->fill_count <= chunk_buf->page_idx);
        return (chunk_buf->page_idx == chunk_buf->fill_count);
}

static inline int segment_is_full(const lsv_wbuf_segment_t *segment) {
        return (segment->chunk_idx == LSV_WBUF_CHUNK_NUM) ||
                ((segment->chunk_idx == LSV_WBUF_CHUNK_NUM - 1) && chunk_is_full(&segment->lsv_chunk[segment->chunk_idx]));
}

typedef struct {
        uint64_t lsn;
        uint64_t lba;
        uint32_t size;
} lsv_io_opt_t;

static inline void lsv_io_opt_init(lsv_io_opt_t *io_opt, uint64_t lsn, uint64_t lba, uint32_t size) {
        io_opt->lsn = lsn;
        io_opt->lba = lba;
        io_opt->size = size;
}

typedef struct __check_io_write_pre_t {
        int wbuf_is_ok;
        int wal_is_ok;
} check_io_write_pre_t;

int lsv_wbuffer_init(lsv_volume_proto_t *lsv_info, lsv_u32_t flag);

int lsv_wbuffer_destroy(lsv_volume_proto_t *lsv_info);

/**
 * @brief wbuf，pipeline队列全部持久化.
 *
 * 同时要堵塞新的IO（加读锁）
 *
 * 基本流程：
 * - 新的IO堵塞
 * - 等待所有收到的IO完成
 * - flush wbuf
 * - flush pipeline
 * - OK
 *
 * @param lsv_info
 * @return
 */
int lsv_wbuffer_flush(lsv_volume_proto_t *lsv_info);

int lsv_wbuffer_flush_unlock(lsv_volume_proto_t *lsv_info);

int lsv_wbuffer_clean_unlock(lsv_volume_proto_t *lsv_info);

/**
 * @brief REDO WAL
 *
 * @param lsv_info
 * @return
 */
int lsv_wbuffer_load(lsv_volume_proto_t *lsv_info);

/**
 *
 * @param lsv_info
 * @param io
 * @return 1 success
 * @return 0 fail
 */
int lsv_wbuffer_check_io_write_pre(lsv_volume_proto_t *lsv_info, const io_t *io, check_io_write_pre_t *pre);

int lsv_wbuffer_wait_io_write_pre(lsv_volume_proto_t *lsv_info, const io_t *io);

/**
 *
 * @param lsv_info
 * @param io_opt
 * @param page_buf
 * @return > 0 size
 * @return < 0 error
 */
int lsv_wbuffer_append_page(lsv_volume_proto_t *lsv_info, const lsv_io_opt_t *io_opt, buffer_t *page_buf);

/**
 *
 * @param lsv_info
 * @param offset
 * @param size
 * @param append_buf
 * @return > 0 size
 * @return <= 0 error
 */
int lsv_wbuffer_page_read(lsv_volume_proto_t *lsv_info, lsv_u64_t offset, lsv_u32_t size, buffer_t *append_buf);

int pipeline_init(pipeline_t *pipeline, lsv_volume_proto_t *lsv_info);

int pipeline_add_tail(pipeline_t *pipeline, lsv_chunk_buf_t *chunk);
int pipeline_do_log(pipeline_t *pipeline);

/**
 *
 * @param qos
 * @param interval 采集时间周期
 * @return
 */
int lsv_wbuf_qos_init(lsv_wbuf_qos_t *qos, int64_t interval);

/**
 *
 * @param qos
 * @param watermark 当前水位线
 * @param in 当前IO大小，以page为计数单位
 * @param delay 计算所得sleep时间, <= 0 表示无须sleep
 * @return
 */
int lsv_wbuf_qos_wait(lsv_wbuf_qos_t *qos, int watermark, int in, int64_t *delay);

/**
 *
 * @param qos
 * @param in 流入量，以page为单位
 * @param out 流出量，以page为单位
 * @return
 */
int lsv_wbuf_qos_update(lsv_wbuf_qos_t *qos, int in, int out);

#endif
